"""Parser plugin interface.

This module defines the parser plugin interface and contains other
related parser support code.

The module is mainly useful for those wanting to write a parser that
can plugin to rdflib. If you are wanting to invoke a parser you likely
want to do so through the Graph class parse method.
"""

from __future__ import annotations

import codecs
import os
import pathlib
import sys
from io import BufferedIOBase, BytesIO, RawIOBase, StringIO, TextIOBase, TextIOWrapper
from typing import (
    IO,
    TYPE_CHECKING,
    Any,
    BinaryIO,
    TextIO,
    Union,
    cast,
)
from urllib.parse import urljoin
from urllib.request import Request, url2pathname
from xml.sax import xmlreader

import rdflib.util
from rdflib import __version__
from rdflib._networking import _urlopen
from rdflib.namespace import Namespace
from rdflib.term import URIRef

if TYPE_CHECKING:
    from email.message import Message
    from io import BufferedReader
    from urllib.response import addinfourl

    from typing_extensions import Buffer

    from rdflib.graph import Graph


__all__ = [
    "Parser",
    "InputSource",
    "StringInputSource",
    "URLInputSource",
    "FileInputSource",
    "PythonInputSource",
]


class Parser:
    __slots__ = ()

    def __init__(self):
        pass

    def parse(self, source: InputSource, sink: Graph) -> None:
        pass


class BytesIOWrapper(BufferedIOBase):
    __slots__ = (
        "wrapped",
        "enc_str",
        "text_str",
        "encoding",
        "encoder",
        "has_read1",
        "has_seek",
        "_name",
        "_fileno",
        "_isatty",
        "_leftover",
        "_bytes_per_char",
        "_text_bytes_offset",
    )

    def __init__(self, wrapped: Union[str, StringIO, TextIOBase], encoding="utf-8"):
        super(BytesIOWrapper, self).__init__()
        self.wrapped = wrapped
        self.encoding = encoding
        self.encoder = codecs.getencoder(self.encoding)
        self.enc_str: Union[BytesIO, BufferedIOBase] | None = None
        self.text_str: Union[StringIO, TextIOBase] | None = None
        self.has_read1: bool | None = None
        self.has_seek: bool | None = None
        self._name: str | None = None
        self._fileno: Union[int, BaseException] | None = None
        self._isatty: Union[bool, BaseException] | None = None
        self._leftover: bytes = b""
        self._text_bytes_offset: int = 0
        norm_encoding = encoding.lower().replace("_", "-")
        if norm_encoding in ("utf-8", "utf8", "u8", "cp65001"):
            # utf-8 has a variable number of bytes per character, 1-4
            self._bytes_per_char: int = 1  # assume average of 1 byte per character
        elif norm_encoding in (
            "latin1",
            "latin-1",
            "iso-8859-1",
            "iso8859-1",
            "ascii",
            "us-ascii",
        ):
            # these are all 1-byte-per-character encodings
            self._bytes_per_char = 1
        elif norm_encoding.startswith("utf-16") or norm_encoding.startswith("utf16"):
            # utf-16 has a variable number of bytes per character, 2-3
            self._bytes_per_char = 2  # assume average of 2 bytes per character
        elif norm_encoding.startswith("utf-32") or norm_encoding.startswith("utf32"):
            # utf-32 is fixed length with 4 bytes per character
            self._bytes_per_char = 4
        else:
            # not sure, just assume it is 2 bytes per character
            self._bytes_per_char = 2

    def _init(self):
        name: str | None = None
        if isinstance(self.wrapped, str):
            b, blen = self.encoder(self.wrapped)
            self.enc_str = BytesIO(b)
            name = "string"
        elif isinstance(self.wrapped, TextIOWrapper):
            inner = self.wrapped.buffer
            # type error: TextIOWrapper.buffer cannot be a BytesIOWrapper
            if isinstance(inner, BytesIOWrapper):  # type: ignore[unreachable, unused-ignore]
                raise Exception(
                    "BytesIOWrapper cannot be wrapped in TextIOWrapper, "
                    "then wrapped in another BytesIOWrapper"
                )
            else:
                self.enc_str = cast(BufferedIOBase, inner)
        elif isinstance(self.wrapped, (TextIOBase, StringIO)):
            self.text_str = self.wrapped
        use_stream: Union[BytesIO, StringIO, BufferedIOBase, TextIOBase]
        if self.enc_str is not None:
            use_stream = self.enc_str
        elif self.text_str is not None:
            use_stream = self.text_str
        else:
            raise Exception("No stream to read from")
        if name is None:
            try:
                name = use_stream.name  # type: ignore[union-attr]
            except AttributeError:
                name = "stream"
        self.has_read1 = hasattr(use_stream, "read1")
        try:
            self.has_seek = use_stream.seekable()
        except AttributeError:
            self.has_seek = hasattr(use_stream, "seek")

        self._name = name

    def _check_fileno(self):
        use_stream: Union[BytesIO, StringIO, BufferedIOBase, TextIOBase]
        if self.enc_str is None and self.text_str is None:
            self._init()
        if self.enc_str is not None:
            use_stream = self.enc_str
        elif self.text_str is not None:
            use_stream = self.text_str
        try:
            self._fileno = use_stream.fileno()
        except OSError as e:
            self._fileno = e
        except AttributeError:
            self._fileno = -1

    def _check_isatty(self):
        use_stream: Union[BytesIO, StringIO, BufferedIOBase, TextIOBase]
        if self.enc_str is None and self.text_str is None:
            self._init()
        if self.enc_str is not None:
            use_stream = self.enc_str
        elif self.text_str is not None:
            use_stream = self.text_str
        try:
            self._isatty = use_stream.isatty()
        except OSError as e:
            self._isatty = e
        except AttributeError:
            self._isatty = False

    @property
    def name(self) -> Any:
        if self._name is None:
            self._init()
        return self._name

    @property
    def closed(self) -> bool:
        if self.enc_str is None and self.text_str is None:
            return False
        closed: bool | None = None
        if self.enc_str is not None:
            try:
                closed = self.enc_str.closed
            except AttributeError:
                closed = None
        elif self.text_str is not None:
            try:
                closed = self.text_str.closed
            except AttributeError:
                closed = None
        return False if closed is None else closed

    def readable(self) -> bool:
        return True

    def writable(self) -> bool:
        return False

    def truncate(self, size: int | None = None) -> int:
        raise NotImplementedError("Cannot truncate on BytesIOWrapper")

    def isatty(self) -> bool:
        if self._isatty is None:
            self._check_isatty()
        if isinstance(self._isatty, BaseException):
            raise self._isatty
        else:
            return bool(self._isatty)

    def fileno(self) -> int:
        if self._fileno is None:
            self._check_fileno()
        if isinstance(self._fileno, BaseException):
            raise self._fileno
        else:
            return -1 if self._fileno is None else self._fileno

    def close(self):
        if self.enc_str is None and self.text_str is None:
            return
        if self.enc_str is not None:
            try:
                self.enc_str.close()
            except AttributeError:
                pass
        elif self.text_str is not None:
            try:
                self.text_str.close()
            except AttributeError:
                pass

    def flush(self):
        return  # Does nothing on read-only streams

    def _read_bytes_from_text_stream(self, size: int | None = -1, /) -> bytes:
        if TYPE_CHECKING:
            assert self.text_str is not None
        if size is None or size < 0:
            try:
                ret_str: str = self.text_str.read()
            except EOFError:
                ret_str = ""
            ret_encoded, enc_len = self.encoder(ret_str)
            if self._leftover:
                ret_bytes = self._leftover + ret_encoded
                self._leftover = b""
            else:
                ret_bytes = ret_encoded
        elif size == len(self._leftover):
            ret_bytes = self._leftover
            self._leftover = b""
        elif size < len(self._leftover):
            ret_bytes = self._leftover[:size]
            self._leftover = self._leftover[size:]
        else:
            d, m = divmod(size, self._bytes_per_char)
            get_per_loop = int(d) + (1 if m > 0 else 0)
            got_bytes: bytes = self._leftover
            while len(got_bytes) < size:
                try:
                    got_str: str = self.text_str.read(get_per_loop)
                except EOFError:
                    got_str = ""
                if len(got_str) < 1:
                    break
                ret_encoded, enc_len = self.encoder(got_str)
                got_bytes += ret_encoded
            if len(got_bytes) == size:
                self._leftover = b""
                ret_bytes = got_bytes
            else:
                ret_bytes = got_bytes[:size]
                self._leftover = got_bytes[size:]
                del got_bytes
        self._text_bytes_offset += len(ret_bytes)
        return ret_bytes

    def read(self, size: int | None = -1, /) -> bytes:
        """
        Read at most size bytes, returned as a bytes object.

        If the size argument is negative or omitted read until EOF is reached.
        Return an empty bytes object if already at EOF.
        """
        if size is not None and size == 0:
            return b""
        if self.enc_str is None and self.text_str is None:
            self._init()
        if self.enc_str is not None:
            ret_bytes = self.enc_str.read(size)
        else:
            ret_bytes = self._read_bytes_from_text_stream(size)
        return ret_bytes

    def read1(self, size: int | None = -1, /) -> bytes:
        """
        Read at most size bytes, with at most one call to the underlying raw stream’s
        read() or readinto() method. Returned as a bytes object.

        If the size argument is negative or omitted, read until EOF is reached.
        Return an empty bytes object at EOF.
        """
        if (self.enc_str is None and self.text_str is None) or self.has_read1 is None:
            self._init()
        if not self.has_read1:
            raise NotImplementedError()
        if self.enc_str is not None:
            if size is None or size < 0:
                return self.enc_str.read1()
            return self.enc_str.read1(size)
        raise NotImplementedError("read1() not supported for TextIO in BytesIOWrapper")

    def readinto(self, b: Buffer, /) -> int:
        """
        Read len(b) bytes into buffer b.

        Returns number of bytes read (0 for EOF), or error if the object
        is set not to block and has no data to read.
        """
        if TYPE_CHECKING:
            assert isinstance(b, (memoryview, bytearray))
        if len(b) == 0:
            return 0
        if self.enc_str is None and self.text_str is None:
            self._init()
        if self.enc_str is not None:
            return self.enc_str.readinto(b)
        else:
            size = len(b)
            read_data: bytes = self._read_bytes_from_text_stream(size)
            read_len = len(read_data)
            if read_len == 0:
                return 0
            b[:read_len] = read_data
            return read_len

    def readinto1(self, b: Buffer, /) -> int:
        """
        Read len(b) bytes into buffer b, with at most one call to the underlying raw
        stream's read() or readinto() method.

        Returns number of bytes read (0 for EOF), or error if the object
        is set not to block and has no data to read.
        """
        if TYPE_CHECKING:
            assert isinstance(b, (memoryview, bytearray))
        if (self.enc_str is None and self.text_str is None) or self.has_read1 is None:
            self._init()
        if not self.has_read1:
            raise NotImplementedError()
        if self.enc_str is not None:
            return self.enc_str.readinto1(b)
        raise NotImplementedError(
            "readinto1() not supported for TextIO in BytesIOWrapper"
        )

    def seek(self, offset: int, whence: int = 0, /) -> int:
        if self.has_seek is not None and not self.has_seek:
            raise NotImplementedError()
        if (self.enc_str is None and self.text_str is None) or self.has_seek is None:
            self._init()

        if not whence == 0:
            raise NotImplementedError("Only SEEK_SET is supported on BytesIOWrapper")
        if offset != 0:
            raise NotImplementedError(
                "Only seeking to zero is supported on BytesIOWrapper"
            )
        if self.enc_str is not None:
            self.enc_str.seek(offset, whence)
        elif self.text_str is not None:
            self.text_str.seek(offset, whence)
        self._text_bytes_offset = 0
        self._leftover = b""
        return 0

    def seekable(self):
        if (self.enc_str is None and self.text_str is None) or self.has_seek is None:
            self._init()
        return self.has_seek

    def tell(self) -> int:
        if self.has_seek is not None and not self.has_seek:
            raise NotImplementedError("Cannot tell() pos because file is not seekable.")
        if self.enc_str is not None:
            try:
                self._text_bytes_offset = self.enc_str.tell()
            except AttributeError:
                pass
        return self._text_bytes_offset

    def write(self, b, /):
        raise NotImplementedError("Cannot write to a BytesIOWrapper")


class InputSource(xmlreader.InputSource):
    """
    TODO:
    """

    def __init__(self, system_id: str | None = None):
        xmlreader.InputSource.__init__(self, system_id=system_id)
        self.content_type: str | None = None
        self.auto_close = False  # see Graph.parse(), true if opened by us

    def close(self) -> None:
        c = self.getCharacterStream()
        if c and hasattr(c, "close"):
            try:
                c.close()
            except Exception:
                pass
        f = self.getByteStream()
        if f and hasattr(f, "close"):
            try:
                f.close()
            except Exception:
                pass


class PythonInputSource(InputSource):
    """
    Constructs an RDFLib Parser InputSource from a Python data structure,
    for example, loaded from JSON with json.load or json.loads:

    >>> import json
    >>> as_string = \"\"\"{
    ...   "@context" : {"ex" : "http://example.com/ns#"},
    ...   "@graph": [{"@type": "ex:item", "@id": "#example"}]
    ... }\"\"\"
    >>> as_python = json.loads(as_string)
    >>> source = create_input_source(data=as_python)
    >>> isinstance(source, PythonInputSource)
    True
    """

    def __init__(self, data: Any, system_id: str | None = None):
        self.content_type = None
        self.auto_close = False  # see Graph.parse(), true if opened by us
        self.public_id: str | None = None
        self.system_id: str | None = system_id
        self.data = data

    def getPublicId(self) -> str | None:  # noqa: N802
        return self.public_id

    def setPublicId(self, public_id: str | None) -> None:  # noqa: N802
        self.public_id = public_id

    def getSystemId(self) -> str | None:  # noqa: N802
        return self.system_id

    def setSystemId(self, system_id: str | None) -> None:  # noqa: N802
        self.system_id = system_id

    def close(self) -> None:
        self.data = None


class StringInputSource(InputSource):
    """
    Constructs an RDFLib Parser InputSource from a Python String or Bytes
    """

    def __init__(
        self,
        value: str | bytes,
        encoding: str = "utf-8",
        system_id: str | None = None,
    ):
        super(StringInputSource, self).__init__(system_id)
        stream: Union[BinaryIO, TextIO]
        if isinstance(value, str):
            stream = StringIO(value)
            self.setCharacterStream(stream)
            self.setEncoding(encoding)
            b_stream = BytesIOWrapper(value, encoding)
            self.setByteStream(b_stream)
        else:
            stream = BytesIO(value)
            self.setByteStream(stream)
            c_stream = TextIOWrapper(stream, encoding)
            self.setCharacterStream(c_stream)
            self.setEncoding(c_stream.encoding)


headers = {
    "User-agent": "rdflib-%s (https://rdflib.github.io/; eikeon@eikeon.com)"
    % __version__
}


class URLInputSource(InputSource):
    """
    Constructs an RDFLib Parser InputSource from a URL to read it from the Web.
    """

    links: list[str]

    @classmethod
    def getallmatchingheaders(cls, message: Message, name) -> list[str]:
        # This is reimplemented here, because the method
        # getallmatchingheaders from HTTPMessage is broken since Python 3.0
        name = name.lower()
        return [val for key, val in message.items() if key.lower() == name]

    @classmethod
    def get_links(cls, response: addinfourl) -> list[str]:
        linkslines = cls.getallmatchingheaders(response.headers, "Link")
        retarray: list[str] = []
        for linksline in linkslines:
            links = [linkstr.strip() for linkstr in linksline.split(",")]
            for link in links:
                retarray.append(link)
        return retarray

    def get_alternates(self, type_: str | None = None) -> list[str]:
        typestr: str | None = f'type="{type_}"' if type_ else None
        relstr = 'rel="alternate"'
        alts = []
        for link in self.links:
            parts = [p.strip() for p in link.split(";")]
            if relstr not in parts:
                continue
            if typestr:
                if typestr in parts:
                    alts.append(parts[0].strip("<>"))
            else:
                alts.append(parts[0].strip("<>"))
        return alts

    def __init__(self, system_id: str | None = None, format: str | None = None):
        super(URLInputSource, self).__init__(system_id)
        self.url = system_id

        # copy headers to change
        myheaders = dict(headers)
        if format == "xml":
            myheaders["Accept"] = "application/rdf+xml, */*;q=0.1"
        elif format == "n3":
            myheaders["Accept"] = "text/n3, */*;q=0.1"
        elif format in ["turtle", "ttl"]:
            myheaders["Accept"] = "text/turtle, application/x-turtle, */*;q=0.1"
        elif format == "nt":
            myheaders["Accept"] = "text/plain, */*;q=0.1"
        elif format == "trig":
            myheaders["Accept"] = "application/trig, */*;q=0.1"
        elif format == "trix":
            myheaders["Accept"] = "application/trix, */*;q=0.1"
        elif format == "json-ld":
            myheaders["Accept"] = (
                "application/ld+json, application/json;q=0.9, */*;q=0.1"
            )
        else:
            # if format not given, create an Accept header from all registered
            # parser Media Types
            from rdflib.parser import Parser
            from rdflib.plugin import plugins

            acc = []
            for p in plugins(kind=Parser):  # only get parsers
                if "/" in p.name:  # all Media Types known have a / in them
                    acc.append(p.name)

            myheaders["Accept"] = ", ".join(acc)

        req = Request(system_id, None, myheaders)  # type: ignore[arg-type]

        response: addinfourl = _urlopen(req)
        self.url = response.geturl()  # in case redirections took place
        self.links = self.get_links(response)
        if format in ("json-ld", "application/ld+json"):
            alts = self.get_alternates(type_="application/ld+json")
            for link in alts:
                full_link = urljoin(self.url, link)
                if full_link != self.url and full_link != system_id:
                    response = _urlopen(Request(full_link))
                    self.url = response.geturl()  # in case redirections took place
                    break

        self.setPublicId(self.url)
        content_types = self.getallmatchingheaders(response.headers, "content-type")
        self.content_type = content_types[0] if content_types else None
        if self.content_type is not None:
            self.content_type = self.content_type.split(";", 1)[0]
        self.setByteStream(response)
        # TODO: self.setEncoding(encoding)
        self.response_info = response.info()  # a mimetools.Message instance

    def __repr__(self) -> str:
        # type error: Incompatible return value type (got "Optional[str]", expected "str")
        return self.url  # type: ignore[return-value]


class FileInputSource(InputSource):
    def __init__(
        self,
        file: Union[BinaryIO, TextIO, TextIOBase, RawIOBase, BufferedIOBase],
        /,
        encoding: str | None = None,
    ):
        base = pathlib.Path.cwd().as_uri()
        system_id = URIRef(pathlib.Path(file.name).absolute().as_uri(), base=base)  # type: ignore[union-attr]
        super(FileInputSource, self).__init__(system_id)
        self.file = file
        if isinstance(file, TextIOBase):  # Python3 unicode fp
            self.setCharacterStream(file)
            self.setEncoding(file.encoding)
            try:
                b = file.buffer  # type: ignore[attr-defined]
                self.setByteStream(b)
            except (AttributeError, LookupError):
                self.setByteStream(BytesIOWrapper(file, encoding=file.encoding))
        else:
            if TYPE_CHECKING:
                assert isinstance(file, BufferedReader)
            self.setByteStream(file)
            if encoding is not None:
                self.setEncoding(encoding)
                self.setCharacterStream(TextIOWrapper(file, encoding=encoding))
            else:
                # We cannot set characterStream here because
                # we do not know the Raw Bytes File encoding.
                pass

    def __repr__(self) -> str:
        return repr(self.file)


def create_input_source(
    source: (
        Union[IO[bytes], TextIO, InputSource, str, bytes, pathlib.PurePath] | None
    ) = None,
    publicID: str | None = None,  # noqa: N803
    location: str | None = None,
    file: BinaryIO | TextIO | None = None,
    data: str | bytes | dict | None = None,
    format: str | None = None,
) -> InputSource:
    """
    Return an appropriate InputSource instance for the given
    parameters.
    """

    # test that exactly one of source, location, file, and data is not None.
    non_empty_arguments = list(
        filter(
            lambda v: v is not None,
            [source, location, file, data],
        )
    )

    if len(non_empty_arguments) != 1:
        raise ValueError(
            "exactly one of source, location, file or data must be given",
        )

    input_source = None

    if source is not None:
        if TYPE_CHECKING:
            assert file is None
            assert data is None
            assert location is None
        if isinstance(source, InputSource):
            input_source = source
        else:
            if isinstance(source, str):
                location = source
            elif isinstance(source, pathlib.PurePath):
                location = str(source)
            elif isinstance(source, bytes):
                data = source
            elif hasattr(source, "read") and not isinstance(source, Namespace):
                f = source
                input_source = InputSource()
                if hasattr(source, "encoding"):
                    input_source.setCharacterStream(source)  # type: ignore[arg-type]
                    input_source.setEncoding(source.encoding)
                    try:
                        b = source.buffer  # type: ignore[union-attr]
                        input_source.setByteStream(b)
                    except (AttributeError, LookupError):
                        input_source.setByteStream(source)  # type: ignore[arg-type]
                else:
                    input_source.setByteStream(f)  # type: ignore[arg-type]
                if f is sys.stdin:
                    input_source.setSystemId("file:///dev/stdin")
                elif hasattr(f, "name"):
                    input_source.setSystemId(f.name)
            else:
                raise Exception(
                    "Unexpected type '%s' for source '%s'" % (type(source), source)
                )

    absolute_location = None  # Further to fix for issue 130

    auto_close = False  # make sure we close all file handles we open

    if location is not None:
        if TYPE_CHECKING:
            assert file is None
            assert data is None
            assert source is None
        (
            absolute_location,
            auto_close,
            file,
            input_source,
        ) = _create_input_source_from_location(
            file=file,
            format=format,
            input_source=input_source,
            location=location,
        )

    if file is not None:
        if TYPE_CHECKING:
            assert location is None
            assert data is None
            assert source is None
        input_source = FileInputSource(file)

    if data is not None:
        if TYPE_CHECKING:
            assert location is None
            assert file is None
            assert source is None
        if isinstance(data, dict):
            input_source = PythonInputSource(data)
            auto_close = True
        elif isinstance(data, (str, bytes, bytearray)):
            input_source = StringInputSource(data)
            auto_close = True
        else:
            raise RuntimeError(f"parse data can only str, or bytes. not: {type(data)}")

    if input_source is None:
        raise Exception("could not create InputSource")
    else:
        input_source.auto_close |= auto_close
        if publicID is not None:  # Further to fix for issue 130
            input_source.setPublicId(publicID)
        # Further to fix for issue 130
        elif input_source.getPublicId() is None:
            input_source.setPublicId(absolute_location or "")
        return input_source


def _create_input_source_from_location(
    file: BinaryIO | TextIO | None,
    format: str | None,
    input_source: InputSource | None,
    location: str,
) -> tuple[URIRef, bool, BinaryIO | TextIO | None, InputSource | None]:
    # Fix for Windows problem https://github.com/RDFLib/rdflib/issues/145 and
    # https://github.com/RDFLib/rdflib/issues/1430
    # NOTE: using pathlib.Path.exists on a URL fails on windows as it is not a
    # valid path. However os.path.exists() returns false for a URL on windows
    # which is why it is being used instead.
    if os.path.exists(location):
        location = pathlib.Path(location).absolute().as_uri()

    base = pathlib.Path.cwd().as_uri()

    absolute_location = URIRef(rdflib.util._iri2uri(location), base=base)

    if absolute_location.startswith("file:///"):
        filename = url2pathname(absolute_location.replace("file:///", "/"))
        file = open(filename, "rb")
    else:
        input_source = URLInputSource(absolute_location, format)

    auto_close = True
    # publicID = publicID or absolute_location  # Further to fix
    # for issue 130

    return absolute_location, auto_close, file, input_source
